Random ForestsΒΆ

Problem Statement
The input data contains surveyed information about potential
customers for a bank. The goal is to build a model that would
predict if the prospect would become a customer of a bank,
if contacted by a marketing exercise.
Techniques used:
1. Random Forests
2. Training and Testing
3. Confusion Matrix
4. Indicator Variables
5. Variable Reduction
# -*- coding: utf-8 -*-

import os

os.chdir("/home/cloudops/spark")
os.curdir

# =====================================
# Load the CSV file into a RDD
# The file is semicolon separated
bankData = sc.textFile("data/bank.csv")
bankData.cache()
bankData.count()     # 542

# Remove the first line (contains headers)
firstLine = bankData.first()
# print(firstLine)
#    0   1     2         3           4         5
# "age";"job";"marital";"education";"default";"balance";
#    6         7        8      9     10     11
# "housing";"loan";"contact";"day";"month";"duration";
#    12         13      14          15      16
# "campaign";"pdays";"previous";"poutcome";"y"

dataLines = bankData.filter(lambda x: x != firstLine)
dataLines.count()     # 541

# =====================================
# Convert the RDD into a Dense Vector
# 1. Change labels to numeric ones
# Notes: Vectors from MLLib

# from pyspark.mllib.linalg import Vectors

from pyspark.ml.linalg import Vectors

def transformToNumeric(inputStr):

    # remove "", split by ';'
    attList = inputStr.replace("\"","").split(";")

    age = float(attList[0])

    # outcome - convert to float
    outcome = 0.0 if attList[16] == "no" else 1.0

    # matrial
    # create indicator variables for single/married - 3 columns
    single = 1.0 if attList[2] == "single" else 0.0
    married = 1.0 if attList[2] == "married" else 0.0
    divorced = 1.0 if attList[2] == "divorced" else 0.0

    # education
    # create indicator variables for education - 3 columns
    primary = 1.0 if attList[3] == "primary" else 0.0
    secondary = 1.0 if attList[3] == "secondary" else 0.0
    tertiary = 1.0 if attList[3] == "tertiary" else 0.0

    # default - convert to float
    default = 0.0 if attList[4] == "no" else 1.0

    # balance - convert to float
    balance = float(attList[5])

    # loan - convert to float
    loan = 0.0 if attList[7] == "no" else 1.0

    # Filter out columns not wanted at this stage
    values = Vectors.dense([ outcome, age, single, married, \
                divorced, primary, secondary, tertiary,\
                default, balance, loan \
                ])
    return values

# Change to a Vector
bankVectors = dataLines.map(transformToNumeric)
bankVectors.collect()[:15]
# [DenseVector([0.0, 30.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1787.0, 0.0]),
#  DenseVector([1.0, 33.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 4789.0, 1.0]),
#  DenseVector([1.0, 35.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1350.0, 0.0]),
#  DenseVector([1.0, 30.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1476.0, 1.0]),
# . . .

# =====================================
# Perform statistical Analysis
# =====================================
from pyspark.mllib.stat import Statistics

bankStats=Statistics.colStats(bankVectors)

bankStats.mean()
# array([3.97412200e-01, 4.12698706e+01, 2.75415896e-01, 6.15526802e-01,
#       1.09057301e-01, 1.53419593e-01, 4.95378928e-01, 3.14232902e-01,
#       2.21811460e-02, 1.44478189e+03, 1.62661738e-01])
bankStats.variance()
# array([2.39919217e-01, 1.11415924e+02, 1.99931540e-01, 2.37091805e-01,
#       9.73437393e-02, 1.30122544e-01, 2.50441569e-01, 2.15889642e-01,
#       2.17293079e-02, 5.87224851e+06, 1.36455124e-01])
bankStats.min()
bankStats.max()

Statistics.corr(bankVectors)
# array([[ 1.        , -0.18232104,  0.46323285, -0.37532413, -0.0781266 ,
#        -0.12561549,  0.02639277,  0.08494841, -0.04536965,  0.03657487,
#        -0.03042059],
# . . .
# Low correlation - may be set of columns could provide more

# =====================================
# Transform DenseVector -> LabelPoint -> Data Frame
# Drop columns that are not required (low correlation)
# =====================================

from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

def transformToLabeledPoint(inStr) :
    lp = (float(inStr[0]), \
          Vectors.dense([inStr[1],inStr[2],inStr[3], \
          inStr[4],inStr[5],inStr[6],inStr[7], \
          inStr[8],inStr[9],inStr[10] \
           ]) \
          )
    return lp

# Convert DenseVector to Label Point
bankLp = bankVectors.map(transformToLabeledPoint)
bankLp.collect()
# [(0.0, DenseVector([30.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1787.0, 0.0])),
# (1.0, DenseVector([33.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 4789.0, 1.0])),
# (1.0, DenseVector([35.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1350.0, 0.0])),
# (1.0, DenseVector([30.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1476.0, 1.0])),

# Convert LabelPoint to SQL DataFrame
bankDF = sqlContext.createDataFrame(bankLp, ["label", "features"])
bankDF.select("label","features").show(5)
# +-----+--------------------+
# |label|            features|
# +-----+--------------------+
# |  0.0|[30.0,0.0,1.0,0.0...|
# |  1.0|[33.0,0.0,1.0,0.0...|
# |  1.0|[35.0,1.0,0.0,0.0...|
# |  1.0|[30.0,0.0,1.0,0.0...|
# |  0.0|[59.0,0.0,1.0,0.0...|
# +-----+--------------------+

# =====================================
# Perform Principal Component Analysis (PCA)
# =====================================
from pyspark.ml.feature import PCA

# top 3 principal components
bankPCA = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
pcaModel = bankPCA.fit(bankDF)

# ERROR:
# IllegalArgumentException: 'requirement failed: Column features must be of type
# struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually
# struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.'

# Recrate starting from sc
# OK

pcaResult = pcaModel.transform(bankDF).select("label","pcaFeatures")
pcaResult.show(truncate=False)
# +-----+------------------------------------------------------------+
# |label|pcaFeatures                                                 |
# +-----+------------------------------------------------------------+
# |0.0  |[-1787.018897197381,28.86209683775509,-0.06459982604832398] |
# |1.0  |[-4789.020177138492,29.922562636341418,-0.983024351309942]  |
# |1.0  |[-1350.0222131632622,34.10110809796672,0.8951427168281594]  |
# |1.0  |[-1476.0189517184558,29.05133399359654,0.39527238680255716] |
# . . .

# =====================================
# Indexing needed as pre-req for Decision Trees
# =====================================
from pyspark.ml.feature import StringIndexer

stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(pcaResult)

td = si_model.transform(pcaResult)
td.collect()
# [Row(label=0.0, pcaFeatures=DenseVector([-1787.0189, 28.8621, -0.0646]), indexed=0.0),
# Row(label=1.0, pcaFeatures=DenseVector([-4789.0202, 29.9226, -0.983]), indexed=1.0),
# Row(label=1.0, pcaFeatures=DenseVector([-1350.0222, 34.1011, 0.8951]), indexed=1.0),
# Row(label=1.0, pcaFeatures=DenseVector([-1476.019, 29.0513, 0.3953]), indexed=1.0),
# . . .

# =====================================
# Split into training and testing data
# =====================================
(trainingData, testData) = td.randomSplit([0.7, 0.3])

trainingData.count()    # 386
testData.count()        # 155

testData.collect()

# =====================================
# Create the model
# =====================================
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

rmClassifer = RandomForestClassifier(labelCol="indexed", \
                featuresCol="pcaFeatures")
rmModel = rmClassifer.fit(trainingData)

# Predict on the test data
predictions = rmModel.transform(testData)
predictions.select("prediction","indexed","label","pcaFeatures").collect()
# [Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-12186.027, 38.1689, -0.9686])),
# Row(prediction=1.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-10924.0305, 43.984, 0.1742])),
# Row(prediction=0.0, indexed=0.0, label=0.0, pcaFeatures=DenseVector([-9009.0251, 36.2106, 0.4038])),

evaluator = MulticlassClassificationEvaluator(predictionCol = "prediction", \
                    labelCol = "indexed", \
                    metricName = "accuracy")
# ERROR: metricName given invalid value precision.
evaluator.evaluate(predictions)
# 0.6451612903225806 - small dataset, bad correlation between features and target

# Draw a confusion matrix
labelList = predictions.select("indexed","label").distinct().toPandas()
predictions.groupBy("indexed","prediction").count().show()
# +-------+----------+-----+
# |indexed|prediction|count|
# +-------+----------+-----+
# |    1.0|       1.0|   22|
# |    0.0|       1.0|   15|
# |    1.0|       0.0|   40|
# |    0.0|       0.0|   78|
# +-------+----------+-----+

# Model accuracy - if 50% yes and 50% no data - even proportion

# =========================================================
# Another example
# =========================================================
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession

spark = SparkSession\
         .builder\
         .appName("PCAExample")\
         .getOrCreate()

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
# ERROR with old Vectors:
# IllegalArgumentException: 'requirement failed: Column features must be of type

result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
# +-----------------------------------------------------------+
# |pcaFeatures                                                |
# +-----------------------------------------------------------+
# |[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
# |[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
# |[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
# +-----------------------------------------------------------+

spark.stop()